package com.example.demo.table;

import com.example.demo.entity.Event;
import com.example.demo.mapper.EventMapper;
import com.example.demo.stream.source.DbSource;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * 测试tableApi
 *
 * @date： 2022/6/13
 * @author: wbx
 */
public class TableApi {
    /* public static void main(String[] args) throws Exception {


 // create environments of both APIs
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

 // create a DataStream
         DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");

 // interpret the insert-only DataStream as a Table
         Table inputTable = tableEnv.fromDataStream(dataStream);

 // register the Table object as a view and query it
         tableEnv.createTemporaryView("InputTable", inputTable);
         Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");

         tableEnv.from("InputTable").executeInsert("OutputTable");

         tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");

         // interpret the insert-only Table as a DataStream again
         DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

         // add a printing sink and execute in DataStream API
         resultStream.print("tableApi查询：");
         env.execute();

 // execute with explicit sink
 //        tableEnv.from("InputTable").executeInsert("OutputTable");

         tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");

         tableEnv.createStatementSet()
                 .addInsert("InputTable", tableEnv.from("OutputTable"))
 //                .add(tableEnv.from("InputTable").insertInto("OutputTable2"))
                 .execute();

         tableEnv.createStatementSet()
                 .addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
                 .addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
                 .execute();

 // execute with implicit local sink

         tableEnv.from("InputTable").execute().print();

         tableEnv.executeSql("SELECT * FROM InputTable").print();

     }*/
    public static void main(String[] args) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DbSource<Event> eventDbSource = new DbSource<>(EventMapper.class, null);
        DataStream<Event> eventDataStreamSource = executionEnvironment.addSource(eventDbSource).returns(Event.class);
        SingleOutputStreamOperator<Row> map = eventDataStreamSource.map(new RichMapFunction<Event, Row>() {
            @Override
            public Row map(Event value) throws Exception {
                return Row.of(value);
            }
        });
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment);
        Table table = tableEnvironment.fromChangelogStream(map);
        table.printSchema();

    }
}
